package com.zhanghe.study.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

/**
 * @author zh
 * @date 2023/7/28 14:45
 */
public class TestCurator {

    // 连接地址
    private final String connect = "localhost:2181";

    // 单位是毫秒
    private final int sessionTimeOut = 2000;
    private final int connectionTimeout = 2000;

    private CuratorFramework zkClient;

    @Before
    public void init() {
        // baseSleepTimeMs 初始sleep时间
        // maxRetries 最大重试次数

        // ExponentialBackoffRetry重试策略是，给定一个初始sleep时间，在这个基础上结合重试次数，计算当前需要sleep的时间
        // 当前sleep时间 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
        // connectString  连接地址，如果是集群的话则使用逗号拼接  如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
        // sessionTimeOut session过期时间，单位毫秒
        // connectionTimeout  连接创建超时时间，单位毫秒
        zkClient = CuratorFrameworkFactory
                .builder()
                .connectString(connect)
                .sessionTimeoutMs(sessionTimeOut)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retry)
                .namespace("base") // 用于进行命名空间隔离,也就是根目录为设置的命名空间名称
                .build();
        // 只有start之后才会执行客户端请求
        zkClient.start();
    }

    // 创建节点
    @Test
    public void createNode() throws Exception {

        // path    路径
        // data    数据

        // 创建模式，默认是持久节点
        zkClient.create()
                .withMode(CreateMode.PERSISTENT) // 节点类型
                .forPath("/curator_test","第一次创建节点".getBytes());

        // 递归创建
        zkClient.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT) // 节点类型
                .forPath("/cup/ch","第一次创建节点".getBytes());

    }

    // 设置节点数据
    @Test
    public void setNodeData() throws Exception {
        // path  路径
        // data  the data to set  数据
        // version  所期待的版本，相当于乐观锁，如果是-1，则匹配所有版本
        zkClient.setData()
                .withVersion(-1)
                .forPath("/curator_test","修改节点数据".getBytes());

    }

    // 获取节点数据
    @Test
    public void getNodeData() throws Exception {

        Stat stat = new Stat();
        //path the given path 路径
        //stat the stat of the node  获取状态信息
        byte[] data = zkClient.getData()
                .storingStatIn(stat) // 获取stat
                .forPath("/curator_test");
        System.out.println(new String(data));
        System.out.println(stat);
    }

    // 获取子节点
    @Test
    public void getChildNodes() throws Exception {

        CuratorCache cache = CuratorCache.build(zkClient, "/test_watch", CuratorCache.Options.SINGLE_NODE_CACHE);

        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forCreatesAndChanges(
                        // 添加或修改缓存中的数据时调用
                        new CuratorCacheListenerBuilder.ChangeListener() {
                            @Override
                            public void event(ChildData oldNode, ChildData node) {
                                System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",
                                        oldNode, node);
                            }
                        })
                .forTreeCache(zkClient, new TreeCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                        ChildData eventData = event.getData();
                        if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){
                            System.out.printf("[forTreeCache] : NODE_ADDED  path: [%s] data:[%s]\n", eventData.getPath(),
                                    new String(eventData.getData())
                            );

                        } else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){
                            System.out.printf("[forTreeCache] : NODE_UPDATED  path: [%s] data:[%s]\n", eventData.getPath(),
                                    new String(eventData.getData())
                            );
                        } else if(event.getType() == TreeCacheEvent.Type.NODE_REMOVED){
                            System.out.printf("[forTreeCache] : NODE_REMOVED  path: [%s] \n", eventData.getPath()
                            );
                        }
                    }

//                    @Override
//                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
//                        ChildData eventData = event.getData();
//                        if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED){
//                            System.out.printf("[forPathChildrenCache] : CHILD_ADDED  path: [%s] data:[%s]\n", eventData.getPath(),
//                                    new String(eventData.getData())
//                            );
//
//                        } else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){
//                            System.out.printf("[forPathChildrenCache] : CHILD_UPDATED  path: [%s] data:[%s]\n", eventData.getPath(),
//                                    new String(eventData.getData())
//                            );
//                        } else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED){
//                            System.out.printf("[forPathChildrenCache] : CHILD_REMOVED  path: [%s] \n", eventData.getPath()
//                            );
//                        } else {
//                            System.out.println(event);
//                        }
//                    }
                })
                .forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))
                .build();
        // 给CuratorCache实例添加监听器
        cache.listenable().addListener(listener);
        // 启动
        cache.start();


        // 监听打印内容 [forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/test_watch', stat=975,975,1690535494382,1690535494382,0,0,0,0,9,0,975
        //, data=[49, 50, 55, 46, 48, 46, 48, 46, 49]}]
        zkClient.create().forPath("/test_watch");
        Thread.sleep(1000);
        // 监听打印内容 [forTreeCache] : NODE_ADDED  path: [/test_watch] data:[127.0.0.1]
        // [forCreatesAndChanges] : Node changed: Old: [ChildData{path='/test_watch', stat=975,975,1690535494382,1690535494382,0,0,0,0,9,0,975
        //, data=[49, 50, 55, 46, 48, 46, 48, 46, 49]}] New: [ChildData{path='/test_watch', stat=975,977,1690535494382,1690535496400,1,1,0,0,2,1,976
        //, data=[49, 49]}]
        zkClient.create().creatingParentsIfNeeded().forPath("/test_watch/c1");
        Thread.sleep(1000);
        //监听打印内容 [forTreeCache] : NODE_UPDATED  path: [/test_watch] data:[11]
        zkClient.setData().forPath("/test_watch","11".getBytes());
        Thread.sleep(1000);
        // 监听打印内容 [forTreeCache] : NODE_REMOVED  path: [/test_watch]
        zkClient.delete().forPath("/test_watch/c1");
        Thread.sleep(1000);
        // 监听打印内容 [forDeletes] : Node deleted: data: [ChildData{path='/test_watch', stat=975,977,1690535494382,1690535496400,1,1,0,0,2,1,976
        //, data=[49, 49]}]
        zkClient.delete().forPath("/test_watch");
        Thread.sleep(3000);


    }

    // 删除节点
    @Test
    public void deleteNode() throws Exception {
        zkClient.delete()
                .forPath("/curator_test");

        // 使用deletingChildrenIfNeeded可以逐层删除节点
        zkClient.delete()
                .deletingChildrenIfNeeded()
                .forPath("/cup");
    }
}
